package esl

import (
	"bufio"
	"bytes"
	"context"
	"errors"
	"gitee.com/zackeus/go-boot/freeswitch/esl/command"
	"gitee.com/zackeus/go-boot/freeswitch/esl/events"
	"gitee.com/zackeus/go-boot/freeswitch/esl/headers"
	"gitee.com/zackeus/go-boot/freeswitch/esl/log"
	"gitee.com/zackeus/go-boot/freeswitch/esl/response"
	"gitee.com/zackeus/go-zero/core/trace"
	"github.com/panjf2000/ants/v2"
	"go.opentelemetry.io/otel/attribute"
	"go.opentelemetry.io/otel/codes"
	oteltrace "go.opentelemetry.io/otel/trace"
	"io"
	"net"
	"net/textproto"
	"strconv"
	"sync"
	"time"
)

const (
	spanName     = "esl.core"
	EndOfMessage = "\r\n\r\n"
)

var (
	closedErr          = errors.New("connection closed")
	eslCmdAttributeKey = attribute.Key("esl.cmd")
)

// Config - ESL 连接通用选项 (inbound or outbound)
type Config struct {
	eventConcurrentSize uint64        // 事件并发大小
	exitTimeout         time.Duration // "exit" 命令超时时长 默认 5s
	eventListener       events.EventListener
}

type Core struct {
	ctx               context.Context
	conn              net.Conn
	reader            *bufio.Reader
	header            *textproto.Reader
	writeLock         sync.Mutex
	stopFunc          func()
	responseChannels  map[string]chan *response.RawResponse
	responseChanMutex sync.RWMutex
	eventListenerLock sync.RWMutex
	eventListener     events.EventListener
	exitTimeout       time.Duration
	eventPool         *ants.Pool
	closeOnce         sync.Once
}

func NewCore(c net.Conn, cfg *Config) (*Core, error) {
	reader := bufio.NewReader(c)
	header := textproto.NewReader(reader)

	/* 构建阻塞的 goroutine pool */
	pool, err := ants.NewPool(int(cfg.eventConcurrentSize),
		ants.WithNonblocking(false),
		ants.WithPreAlloc(false),
		/* 清理周期 */
		ants.WithExpiryDuration(1*time.Hour),
		/* 定期清理携程 */
		ants.WithDisablePurge(false),
	)
	if err != nil {
		return nil, err
	}
	/* 构建上下文 */
	ctx, stop := context.WithCancel(context.Background())

	instance := &Core{
		ctx:    ctx,
		conn:   c,
		reader: reader,
		header: header,
		responseChannels: map[string]chan *response.RawResponse{
			response.TypeReply:       make(chan *response.RawResponse),
			response.TypeAPIResponse: make(chan *response.RawResponse),
			response.TypeEventPlain:  make(chan *response.RawResponse),
			response.TypeEventXML:    make(chan *response.RawResponse),
			response.TypeEventJSON:   make(chan *response.RawResponse),
			/* 设置缓冲以确保在设置响应之前不会丢失 */
			response.TypeRudeRejection: make(chan *response.RawResponse, 1),
			response.TypeAuthRequest:   make(chan *response.RawResponse, 1),
			response.TypeDisconnect:    make(chan *response.RawResponse, 1),
		},
		stopFunc:      stop,
		eventListener: cfg.eventListener,
		eventPool:     pool,
		exitTimeout:   cfg.exitTimeout,
	}
	go instance.receiveLoop()
	go instance.eventLoop()
	return instance, nil
}

// IsActive 是否存活
func (c *Core) IsActive() bool {
	/* 执行 stopFunc c.ctx.Err() 会返回 context canceled */
	return c.ctx != nil && c.ctx.Err() == nil
}

// send - Sends the specified ESL command to FreeSWITCH with the provided context. Returns the response data and any errors encountered.
func (c *Core) send(ctx context.Context, command command.Command) (*response.RawResponse, error) {
	c.writeLock.Lock()
	defer c.writeLock.Unlock()

	if deadline, ok := ctx.Deadline(); ok {
		_ = c.conn.SetWriteDeadline(deadline)
	} else {
		/* 默认3秒超时 */
		_ = c.conn.SetWriteDeadline(time.Now().Add(3 * time.Second))
	}

	/* 链路追踪 */
	tracer := trace.TracerFromContext(ctx)
	ctx, span := tracer.Start(ctx, spanName, oteltrace.WithSpanKind(oteltrace.SpanKindClient))
	span.SetAttributes(eslCmdAttributeKey.String(command.BuildCmd()))
	defer span.End()

	/* EndOfMessage 结尾标符 */
	if _, err := c.conn.Write([]byte(command.BuildMessage() + EndOfMessage)); err != nil {
		return nil, err
	}

	// Get response
	c.responseChanMutex.RLock()
	defer c.responseChanMutex.RUnlock()
	select {
	case res, ok := <-c.responseChannels[response.TypeReply]:
		if !ok {
			/* channel is closed */
			span.SetStatus(codes.Error, closedErr.Error())
			span.RecordError(closedErr)
			return nil, closedErr
		}
		span.SetStatus(codes.Ok, "")
		return res, nil
	case res, ok := <-c.responseChannels[response.TypeAPIResponse]:
		if !ok {
			/* channel is closed */
			span.SetStatus(codes.Error, closedErr.Error())
			span.RecordError(closedErr)
			return nil, closedErr
		}
		span.SetStatus(codes.Ok, "")
		return res, nil
	case <-ctx.Done():
		err := ctx.Err()
		span.SetStatus(codes.Error, err.Error())
		span.RecordError(err)
		return nil, err
	}
}

func (c *Core) callEventListener(event *events.Event) {
	c.eventListenerLock.RLock()
	defer c.eventListenerLock.RUnlock()

	if c.eventListener != nil {
		_ = c.eventPool.Submit(func() {
			c.eventListener(event)
		})
	}
}

// TODO: Needs processing
func (c *Core) readXMLEvent(body []byte) (*events.Event, error) {
	return &events.Event{
		Headers: make(textproto.MIMEHeader),
	}, nil
}

// TODO: Needs processing
func (c *Core) readJSONEvent(body []byte) (*events.Event, error) {
	return &events.Event{
		Headers: make(textproto.MIMEHeader),
	}, nil
}

func (c *Core) readPlainEvent(body []byte) (*events.Event, error) {
	reader := bufio.NewReader(bytes.NewBuffer(body))
	header := textproto.NewReader(reader)

	h, err := header.ReadMIMEHeader()
	if err != nil {
		return nil, err
	}
	event := &events.Event{
		Headers: h,
	}

	if contentLength := h.Get(headers.ContentLength); len(contentLength) > 0 {
		length, err := strconv.Atoi(contentLength)
		if err != nil {
			return event, err
		}
		event.Body = make([]byte, length)
		_, err = io.ReadFull(reader, event.Body)
		if err != nil {
			return event, err
		}
	}

	return event, nil
}

func (c *Core) eventLoop() {
	for {
		var event *events.Event
		var err error

		c.responseChanMutex.RLock()
		select {
		case raw, ok := <-c.responseChannels[response.TypeEventPlain]:
			if !ok {
				/* 通道关闭 */
				c.responseChanMutex.RUnlock()
				return
			}
			event, err = c.readPlainEvent(raw.Body)
		case raw, ok := <-c.responseChannels[response.TypeEventXML]:
			if !ok {
				/* 通道关闭 */
				c.responseChanMutex.RUnlock()
				return
			}
			event, err = c.readXMLEvent(raw.Body)
		case raw, ok := <-c.responseChannels[response.TypeEventJSON]:
			if !ok {
				/* 通道关闭 */
				c.responseChanMutex.RUnlock()
				return
			}
			event, err = c.readJSONEvent(raw.Body)
		case <-c.ctx.Done():
			c.responseChanMutex.RUnlock()
			return
		}
		c.responseChanMutex.RUnlock()

		if err != nil {
			log.Warn("Error parsing event\n%s\n", err.Error())
			continue
		}
		if event == nil {
			continue
		}
		/* 事件通知 */
		c.callEventListener(event)
	}
}

func (c *Core) receiveLoop() {
	for {
		select {
		case <-c.ctx.Done():
			return
		default:
			if err := c.doMessage(); err != nil {
				if err != io.EOF {
					log.Error("Error receiving message: %s\n", err.Error())
					c.shutdown()
				}
				return
			}
		}
	}
}

func (c *Core) doMessage() error {
	res, err := c.readResponse()
	if err != nil {
		return err
	}

	c.responseChanMutex.RLock()
	defer c.responseChanMutex.RUnlock()
	responseChan, ok := c.responseChannels[res.GetHeader(headers.ContentType)]
	if !ok && len(c.responseChannels) <= 0 {
		// We must have shutdown!
		return errors.New("no response channels")
	}

	// We have a handler
	if ok {
		// 仅允许 3 秒让处理程序在通道上接收消息
		ctx, cancel := context.WithTimeout(c.ctx, 3*time.Second)
		defer cancel()

		select {
		case responseChan <- res:
		case <-c.ctx.Done():
			// 父连接上下文已停止，很可能在等待处理程序处理消息的过程中关闭
			return c.ctx.Err()
		case <-ctx.Done():
			// 不要返回错误，因为这不是致命的，但要记录错误，因为它可能表明存在问题
			log.Warn("No one to handle response, Is the connection overloaded or stopping?\n%v\n\n", res)
		}
	} else {
		return errors.New("no response channel for Content-Type: " + res.GetHeader(headers.ContentType))
	}
	return nil
}

// 从 ESL 读取响应消息
func (c *Core) readResponse() (*response.RawResponse, error) {
	header, err := c.header.ReadMIMEHeader()
	if err != nil {
		return nil, err
	}
	res := &response.RawResponse{
		Headers: header,
	}

	if contentLength := header.Get(headers.ContentLength); len(contentLength) > 0 {
		length, err := strconv.Atoi(contentLength)
		if err != nil {
			return res, err
		}
		res.Body = make([]byte, length)
		if _, err = io.ReadFull(c.reader, res.Body); err != nil {
			return res, err
		}
	}
	return res, nil
}

func (c *Core) submitEvent(listener events.EventListener, event *events.Event) {
	_ = c.eventPool.Submit(func() {
		listener(event)
	})
}

// shutdown - Attempt to gracefully send FreeSWITCH "exit" over the ESL connection before closing our connection and stopping. Protected by a sync.Once
func (c *Core) shutdown(exit ...bool) {
	c.closeOnce.Do(func() {
		if exit != nil && exit[0] {
			/* 尝试正常关闭与 FreeSWITCH 的连接 */
			ctx, cancel := context.WithTimeout(c.ctx, c.exitTimeout)
			_, _ = c.send(ctx, command.Exit{})
			cancel()
			/* 延迟 以确保 disconnect-notice 事件获取 */
			time.Sleep(time.Millisecond)
		}

		// Allow users to do anything they need to do before we tear everything down
		c.stopFunc()
		c.responseChanMutex.Lock()
		defer c.responseChanMutex.Unlock()

		for key, responseChan := range c.responseChannels {
			close(responseChan)
			delete(c.responseChannels, key)
		}
		/* 释放事件池 */
		_ = c.eventPool.ReleaseTimeout(3 * time.Second)
		c.eventListener = nil
		/* 仅在锁定响应通道并删除所有响应通道以确保不会在关闭的通道上接收信息后才关闭连接 */
		_ = c.conn.Close()
	})
}
